import pymysql
from elasticsearch import Elasticsearch


# def get_data():
#     conn = pymysql.connect(host="localhost", port=3306, user="root", password="12345678", database="flask01")
#     cursor = conn.cursor(pymysql.cursors.DictCursor)
#     sql = "select * from xapi_test"
#     cursor.execute(sql)
#     results = cursor.fetchall()
#     conn.close()
#     return results
#
#
# def create_es_data():
#     index_name = "xapi"
#     es = Elasticsearch(
#         ['127.0.0.1:9200'],
#         # 在做任何操作之前，先进行嗅探
#         # sniff_on_start=True,
#         # # 节点没有响应时，进行刷新，重新连接
#         sniff_on_connection_fail=True,
#         # # 每 60 秒刷新一次
#         sniffer_timeout=60
#     )
#
#     # 索引存在，先删除索引
#     if es.indices.exists(index=index_name):
#         es.indices.delete(index=index_name)
#     else:
#         print('索引不存在，可以创建')
#     # 创建索引
#     body = {
#         "settings": {
#             "index": {
#                 "number_of_shards": "1",
#                 "number_of_replicas": "0"
#             }
#         }
#     }
#     es.indices.create(index=index_name, body=body, include_type_name=True)
#     # es.indices.create(index=index_name, body=body)
#
#     # 查看索引的信息
#     print(es.info())
#
#     try:
#         results = get_data()
#         for row in results:
#             print(row)
#             # message = {
#             #     "id": row[0],
#             #     "school": row[1],
#             #     "classname": row[2],
#             #     "unique_number": row[3],
#             #     "student_name": row[4],
#             #     "xapi_name": row[5],
#             #     "start_time": row[6],
#             #     "end_time": row[7],
#             #     "is_major": row[8],
#             #     "description": row[9]
#             # }
#             es.index(index=index_name, doc_type="test-type", document=row)
#             # es.index(index=index_name, document=row)
#     except Exception as e:
#         print("Error:" + str(e))
#
#
# if __name__ == "__main__":
#     # for row in get_data():
#     #     print(row)
#     create_es_data()




from elasticsearch import Elasticsearch


class MyElasticSearch():
    # def __init__(self,index_name,index_type):
    def __init__(self, index_name):
        self.es = Elasticsearch(['127.0.0.1:9200'],
                                # 在做任何操作之前，先进行嗅探
                                # sniff_on_start=True,
                                # # 节点没有响应时，进行刷新，重新连接
                                sniff_on_connection_fail=True,
                                # # 每 60 秒刷新一次
                                sniffer_timeout=60)
        self.index_name = index_name
        # self.index_type=index_type
        if self.es.indices.exists(index=index_name):
            self.es.indices.delete(index=index_name)
        else:
            print('索引不存在，可以创建')
        # 创建索引
        body = {
            "settings": {
                "index": {
                    "number_of_shards": "1",
                    "number_of_replicas": "0"
                }
            }
        }
        self.es.indices.create(index=index_name, body=body)

        # 查看索引的信息
        print(self.es.info())

    def insert_one(self, doc):
        # self.es.index(index=self.index_name,doc_type=self.index_type,document=doc)
        self.es.index(index=self.index_name, document=doc)

    def insert_array(self, docs):
        for doc in docs:
            # self.es.index(index=self.index_name,doc_type=self.index_type,document=doc)
            self.es.index(index=self.index_name, document=doc)

    def update_one(self, doc, uid):
        # self.es.update(index=self.index_name,doc_type=self.index_type,id=uid,body=doc)
        self.es.update(index=self.index_name, id=str(uid), body=doc)

    def delete_one(self, uid):
        # self.es.delete(index=self.index_name,doc_type=self.index_type,id=uid)
        self.es.delete(index=self.index_name, id=uid)


from kafka import KafkaConsumer
# from elasticsearch_class import MyElasticSearch
import json

consumer = KafkaConsumer("message", bootstrap_servers=["localhost:9092"])
# es=MyElasticSearch("registered_user","test-type")
es = MyElasticSearch("registered_user")

for mess in consumer:
    mess_str = mess.value.decode("utf8")
    mess_str = json.loads(mess_str)
    print(type(mess_str))
    flag = mess_str["action"]
    if flag == "insert":
        print(mess_str["data"])
        es.insert_one(mess_str["data"])
    elif flag == "delete":
        es.delete_one(mess_str["data"])
    elif flag == "update":
        es.update_one(mess_str["after_values"], uid=mess_str["after_values"]["id"])







# #监控binlog
# import datetime
#
# from pymysqlreplication import BinLogStreamReader
# from pymysqlreplication.row_event import (
#     DeleteRowsEvent,
#     UpdateRowsEvent,
#     WriteRowsEvent,
# )
# import json
# import sys
#
# MYSQL_SETTINGS = {
#     "host": "localhost",
#     "user": "root",
#     "password": "12345678"
# }
# stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, server_id=1, blocking=True, only_schemas=["flask01"],
#                             only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
# class DateEncoder(json.JSONEncoder):
#     def default(self, obj):
#         if isinstance(obj,datetime.datetime):
#             return obj.strftime("%Y-%m-%d %H:%M:%S")
#         else:
#             return json.JSONEncoder.default(self,obj)
#
#
# for binlogevent in stream:
#     for row in binlogevent.rows:
#         event = {"schema": binlogevent.schema, "table": binlogevent.table}
#         if isinstance(binlogevent, DeleteRowsEvent):
#             event["action"] = "delete"
#             event["data"] = row["values"]
#         elif isinstance(binlogevent, WriteRowsEvent):
#             event["action"] = "insert"
#             event["data"] = row["values"]
#         elif isinstance(binlogevent, UpdateRowsEvent):
#             event["action"] = "update"
#             event["before_values"] = dict(row["before_values"].items())
#             event["after_values"] = dict(row["after_values"].items())
#         print(json.dumps(event, ensure_ascii=False,cls=DateEncoder))
#         sys.stdout.flush()
# stream.close()
